use std::{
    io::{self, Read, Write},
    sync::Arc,
    thread,
    time::Duration,
};

use manage::{ManageSiminkStatus, SiminkManage};
use mio::{net::TcpListener, Events, Interest, Poll, Token};
use monitor::Monitor;
use rcu::{rcu_register_thread, rcu_unregister_thread};

pub(crate) fn protocol_process(mon: Arc<Monitor>, manage: Arc<SiminkManage>, addr: String) {
    manage.clone().push_worker(
        thread::Builder::new()
            .name("tcp_protocol".to_string())
            .spawn(move || {
                rcu_register_thread();

                let mut listener = TcpListener::bind(addr.parse().unwrap()).unwrap();
                let mut poll = Poll::new().unwrap();
                poll.registry().register(&mut listener, Token(0), Interest::READABLE).unwrap();

                let mut events = Events::with_capacity(10);
                'out: loop {
                    poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();
                    for event in events.iter() {
                        if event.token() == Token(0) && event.is_readable() {
                            match listener.accept() {
                                Ok((mut stream, _)) => {
                                    let mut stream_poll = Poll::new().unwrap();
                                    stream_poll
                                        .registry()
                                        .register(&mut stream, Token(1), Interest::READABLE)
                                        .unwrap();
                                    let mut stream_events = Events::with_capacity(5);

                                    loop {
                                        stream_poll
                                            .poll(
                                                &mut stream_events,
                                                Some(Duration::from_millis(100)),
                                            )
                                            .unwrap();
                                        for ev in stream_events.iter() {
                                            let mut json = String::new();
                                            if ev.token() == Token(1) && ev.is_readable() {
                                                let mut buf = [0u8; 4096];
                                                let res = stream.read(&mut buf);
                                                match res {
                                                    Ok(size) => {
                                                        if size == 0 {
                                                            break;
                                                        }
                                                        let str = String::from_utf8(
                                                            buf[0..size].to_vec(),
                                                        );
                                                        if let Ok(s) = str {
                                                            json.push_str(s.as_str());
                                                        } else {
                                                            continue;
                                                        }
                                                    },
                                                    Err(_) => {
                                                        unimplemented!()
                                                    },
                                                }

                                                if json.len() <= 2 {
                                                    continue;
                                                }
                                                if json.ends_with('\n') {
                                                    let mut res = mon.protocol_exec(json.as_str());
                                                    res.push('\n');
                                                    let res = stream.write_all(res.as_bytes());
                                                    if res.is_err() {
                                                        unimplemented!();
                                                    }
                                                }
                                            }
                                        }
                                        if manage.simink_status() == ManageSiminkStatus::Quiting {
                                            // 可能连接被断开, 不用处理返回值
                                            let _ = stream.shutdown(std::net::Shutdown::Both);
                                            break 'out;
                                        }
                                    }
                                },
                                Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                                    continue;
                                },
                                Err(_) => {
                                    unimplemented!();
                                },
                            }
                        }
                    }
                    // 检测系统状态
                    if manage.simink_status() == ManageSiminkStatus::Quiting {
                        break 'out;
                    }
                }
                rcu_unregister_thread();
            })
            .unwrap(),
    );
}
